El Dockerfile utilizado para esta segunda parte de la tarea es el mismo que se utilizó en la primera. El Dockerfile se encuentra tanto en la carpeta de la primera parte como en la de la segunda parte.

Ejercicio 1: Con los datos de ECOBICI 2010 a 2017, ¿cuántos registros hay por cicloestación?

El output del mapreduce se muestra en el archivo ‘output1.txt’ en la carpeta ejercicio_1. En la misma se encuentran los archivos mapper1.py y reducer1.py.

Códigos

Mapper.py

#!/usr/bin/env python
import sys
import re
 
for line in sys.stdin:
    linea = re.split(',',line)
    print(linea[13] + "\t1")

Reducer.py

#!/usr/bin/env python
import sys
previous = None
sum = 0
for line in sys.stdin:
  key, value = line.split('\t')
  if key != previous:
      if previous is not None:
        print(previous + '\t' + str(sum))
      previous = key
      sum = 0
  sum += int(value)
print(previous + '\t' + str(sum))

Head del output.txt

head -10 /home/daniel/Documents/DataScience/Repos/Sem2/metodos_gran_escala/alumnos/cristian_challu/tarea_3/parte_2/ejercicio_1/output1.txt  
## 1 RIO BALSAS-RIO SENA    476904
## 10 REFORMA-RAMIREZ   291130
## 100 20 DE NOVIEMBRE-MESONES  22924
## 101 1ER. CALLEJ<U+00D3>N DE MESONES-MESONES  29120
## 102 ECHEVESTE-BOLIVAR    42642
## 103 JOAQU<U+00CD>N VEL<U+00C1>ZQUEZ DE LE<U+00D3>N-GARC<U+00CD>A ICAZBALCETA 32382
## 104 SAN JERONIMO-ISABEL LA CATOLICA  54598
## 105 SAN JERONIMO-5 DE FEBRERO    35906
## 106 SAN JERONIMO-JOSE MARIA PINO SUAREZ  88989
## 107 TOLSA-BALDERAS   116998

Imagen del jps del clúster

Nota: dadas las capacidades técnicas del tercer equipo que quisimos unir al clúster lo tuvimos que eliminar en el proceso de Map-Reduce pues generaba errores de memoria. Sin embargo, como se puede observar en la primera parte de tarea, si logramos levantar el YARN con tres equipos.

Master

Slave

Imagen del localhost:8088 seleccionando la opción de nodos:

Imagen del localhost:8088 seleccionando la opción FINISHED para ver que los jobs corrieron correctamente

Producto de la línea de comando mostrando el mapreduce exitoso:

Ejercicio 2: Con los datos de vuelos retrasados en USA hacer un join del lado del mapper con flights, airports y airlines. Primero intenta una sola llave o flights o airports

Una muestra del output del mapreduce se muestra en el archivo ‘output_sample.txt’ carpeta ejercicio_2 pues el archivo completo rondaba los 800 MB. En la misma se encuentran los archivos mapper-airline-flights.py y mapper-joined-airports.py. Estos archivo lleva a cabo el join de las tablas de airlines y flights y el producto de este primer join con la tabla de airports, respectivamente. No es necesario crear un reducer para este tipo de join pues toda la ejecución se lleva a cabo en el Mapper.

Códigos

El primer mapper desarrollaa el join entre la tabla de flights.csv y airlines.csv y el segundo desarrolla el join entre el producto del mapper anterior y la tabla de airports

Mapper-airline-flights.py

#!/usr/bin/env python

import sys

airline_dict = {}
# Lee la tabla de airlines a memoria, pues es pequeña
airlines = open('airlines.csv','r')
# Llena un diccionario con los valores posibles de la tabla
for line in airlines:
   line = line.strip()
   splits = line.split(",")
   airline_dict[splits[0]] = splits[1]
# Cierra el archivo
airlines.close()
# Itera línea por línea agregando el valor del nombre de la aerolínea de acuerdo con el código. Si no encuentra el código de la aerolínea en la tabla de airlines agrega un valor 'NA'.
for line in sys.stdin:
    line = line.strip()
    splits = line.split(",")
    if (splits[4] in airline_dict):
        splits.insert(5,airline_dict[splits[4]])
    else:
        splits.insert(5,'NA')
    print(','.join(splits))

Mapper-airline-flights.py

#!/usr/bin/env python

import sys

airports_dict = {}
# Lee la tabla de airports
airports = open('airports.csv','r')
# Llena un diccionario con los valores posibles de la tabla
for line in airports:
   line = line.strip()
   splits = line.split(",")
   airports_dict[splits[0]] = splits[1]

airports.close()
# Itera línea por línea agregando el valor del nombre del aeropuerto de acuerdo con el código. Si no encuentra el código del aeropuerto en la tabla de airports agrega un valor 'NA'.
for line in sys.stdin:
    line = line.strip()
    splits = line.split(",")
    if (splits[8] in airports_dict):
        splits.insert(9,airports_dict[splits[8]])
    else:
        splits.insert(9,'NA')
    print(','.join(splits))

Head del output.csv

Únicamente se subió una muestra de la tabla resultante del join porque la tabla completa pesa alrededor de 1 GB, que supera la cantidad que se debería de subir a github. En la carpeta se adjunta una muestra de 500 líneas de la tabla final.

head -10 /home/daniel/Documents/DataScience/Repos/Sem2/metodos_gran_escala/alumnos/cristian_challu/tarea_3/parte_2/ejercicio_2/output_sample.txt  
## 2015,2,23,1,AS,Alaska Airlines Inc.,585,N607AS,SNA,John Wayne Airport (Orange County Airport),PDX,1835,1832,-3,21,1853,139,152,127,859,2100,4,2054,2104,10,0,0,,,,,,
## 2015,12,28,1,EV,Atlantic Southeast Airlines,4157,N14162,IAH,George Bush Intercontinental Airport,XNA,1401,1405,4,25,1430,94,101,64,438,1534,12,1535,1546,11,0,0,,,,,,
## 2015,1,4,7,EV,Atlantic Southeast Airlines,6116,N12201,TUL,Tulsa International Airport,DEN,0932,0922,-10,12,0934,116,129,94,541,1008,23,1028,1031,3,0,0,,,,,,
## 2015,11,9,1,UA,United Air Lines Inc.,1454,N35271,PBI,Palm Beach International Airport,EWR,1217,1217,0,13,1230,174,152,131,1023,1441,8,1511,1449,-22,0,0,,,,,,
## 2015,6,4,4,AA,American Airlines Inc.,1022,N867AA,LAS,McCarran International Airport,DFW,1420,1420,0,11,1431,169,173,128,1055,1839,34,1909,1913,4,0,0,,,,,,
## 2015,9,27,7,DL,Delta Air Lines Inc.,2184,N963AT,ATL,Hartsfield-Jackson Atlanta International Airport,SDF,1454,1456,2,12,1508,82,69,51,321,1559,6,1616,1605,-11,0,0,,,,,,
## 2015,4,6,1,WN,Southwest Airlines Co.,4206,N656SW,BWI,Baltimore-Washington International Airport,BDL,1645,1726,41,15,1741,65,65,47,283,1828,3,1750,1831,41,0,0,,0,0,10,31,0
## 2015,6,1,1,EV,Atlantic Southeast Airlines,4684,N17984,SDF,Louisville International Airport (Standiford Field),IAH,1831,2143,192,11,2154,147,133,107,788,2241,15,1958,2256,178,0,0,,0,0,0,178,0
## 2015,11,24,2,EV,Atlantic Southeast Airlines,4433,N11536,PIA,General Wayne A. Downing Peoria International Airport,IAH,0557,0555,-2,24,0619,152,147,117,802,0816,6,0829,0822,-7,0,0,,,,,,
## 2015,12,14,1,AA,American Airlines Inc.,1964,N708UW,CLT,Charlotte Douglas International Airport,MCI,1140,1137,-3,14,1151,160,146,128,808,1259,4,1320,1303,-17,0,0,,,,,,

Imagen del jps del clúster

Nota: dadas las capacidades técnicas del tercer equipo que quisimos unir al clúster lo tuvimos que eliminar en el proceso de Map-Reduce pues generaba errores de memoria. Sin embargo, como se puede observar en la primera parte de la tarea, si logramos levantar el YARN con tres equipos.

Master

Slave

En este se muestran los procesos que corren durante la ejecución del Map-Reduce en el esclavo.

Imagen del localhost:8088 seleccionando la opción de nodos:

Imagen del localhost:8088 seleccionando la opción FINISHED para ver que los jobs corrieron correctamente

Producto de la línea de comando mostrando el mapreduce exitoso:

Ejercicio 3: Con los datos de vuelos retrasados en USA hacer un join del lado del reducer con flights, airports y airlines. Primero intenta una sola llave o flights o airports

Una muestra del output del mapreduce se muestra en el archivo ‘output3_sample.txt’ en la misma carpeta ejercicio_3. En la misma se encuentran los archivos mapper3.py, mapper3_2.py, reducer3.py y reducer3_2.py. Estos archivos ejecutan el join de las tres tablas en dos fases. El mapper3.py y reducer3.py se encargan de hacer el join entre flights.csv y airlines.csv. Los archivos mapper3_2.py y reducer3_2.py se encargan de hacer el join de la tabla producto de la operación anterior la tabla de airports.csv.

Códigos

Estos mapper y reducer desarrollan el join entre la tabla de flights.csv y airlines.csv

Mapper.py

#!/usr/bin/env python

import sys

for line in sys.stdin:
    line = line.strip()
    splits = line.split(',')
    if len(splits) == 2:
        print(splits[0],',','0',',',splits[1],sep='')
    else:
        print(splits[4],',','NA',end='',sep='')
        for i in range(31):
            if i == 4:
                continue
            print(',',splits[i],sep='',end='')
        print('')

Reducer.py

#!/usr/bin/env python
import sys
import string

last_airline_id = None
cur_airline_name = "-"

for line in sys.stdin:
    line = line.strip()
    linea = line.split(',')
    if not last_airline_id or last_airline_id != linea[0]:
        last_airline_id = linea[0]
        cur_airline_name = linea[2]
    elif linea[0] == last_airline_id:
        linea[1] = cur_airline_name
        print(linea[1],sep='',end='')
        for i in range(2,len(linea)):
            print(',',linea[i],sep='',end='')
        print('')

Los siguientes mapper y reducer realizan el join entre la tabla producto del join anterior y la tabla de airports

Mapper.py

#!/usr/bin/env python

import sys

for line in sys.stdin:
    line = line.strip()
    splits = line.split(',')
    if len(splits) == 7:
        print(splits[0],',','0',',',splits[1],',',splits[2],',',splits[3],',',splits[4],',',splits[5],',',splits[6],sep='')
    else:
        print(splits[7],',','NA,','NA,','NA,','NA,','NA,','NA',end='',sep='')
        for i in range(31):
            if i == 7:
                continue
            print(',',splits[i],sep='',end='')
        print('')

Reducer.py

#!/usr/bin/env python
import sys
import string

last_airport_id = None
cur_1 = "-"
cur_2 = "-"
cur_3 = "-"
cur_4 = "-"
cur_5 = "-"
cur_6 = "-"

for line in sys.stdin:
    line = line.strip()
    linea = line.split(',')
    if not last_airport_id or last_airport_id != linea[0]:
        last_airport_id = linea[0]
        cur_1 = linea[2]
        cur_2 = linea[3]
        cur_3 = linea[4]
        cur_4 = linea[5]
        cur_5 = linea[6]
        cur_6 = linea[7]
    elif linea[0] == last_airport_id:
        linea[1] = cur_1
        linea[2] = cur_2
        linea[3] = cur_3
        linea[4] = cur_4
        linea[5] = cur_5
        linea[6] = cur_6
        print(linea[1],sep='',end='')
        for i in range(2,len(linea)):
            print(',',linea[i],sep='',end='')
        print('')

Head del output.csv

Únicamente se subió una muestra de la tabla resultante del join porque la tabla completa pesa alrededor de 1 GB, que supera la cantidad que se debería de subir a github. En la carpeta se adjunta una muestra de 500 líneas de la tabla final.

head -10 /home/daniel/Documents/DataScience/Repos/Sem2/metodos_gran_escala/alumnos/cristian_challu/tarea_3/parte_2/ejercicio_3/output3_sample.csv  
## Salt Lake City International Airport,Salt Lake City,UT,USA,40.78839,-111.97777,Southwest Airlines Co.,2015,2,24,2,4210,N8640D,MDW,1605,1603,-2,11,1614,185,183,157,1259,1951,15,2010,2006,-4,0,0,,,,,,   
## Honolulu International Airport,Honolulu,HI,USA,21.31869,-157.92241,Alaska Airlines Inc.,2015,3,22,7,834,N535AS,PDX,1120,1111,-9,20,1131,330,328,305,2603,1936,3,1950,1939,-11,0,0,,,,,,  
## Detroit Metropolitan Airport,Detroit,MI,USA,42.21206,-83.34884,Delta Air Lines Inc.,2015,7,1,3,2021,N328NW,PHX,1740,1739,-1,20,1759,248,252,228,1671,1847,4,1848,1851,3,0,0,,,,,,    
## Louis Armstrong New Orleans International Airport,New Orleans,LA,USA,29.99339,-90.25803,Southwest Airlines Co.,2015,2,26,4,121,N247WN,FLL,1845,1844,-1,12,1856,105,104,87,674,2123,5,2130,2128,-2,0,0,,,,,,  
## Philadelphia International Airport,Philadelphia,PA,USA,39.87195,-75.24114,American Airlines Inc.,2015,11,9,1,1733,N935UW,CLT,1525,1522,-3,14,1536,105,99,79,449,1655,6,1710,1701,-9,0,0,,,,,,    
## NA,NA,NA,NA,NA,American Eagle Airlines Inc.,Delta Air Lines Inc.,2015,10,15,4,1603,N363NW,13487,0510,0507,-3,14,0521,62,54,35,223,0556,5,0612,0601,-11,0,0,,,,,, 
## Tampa International Airport,Tampa,FL,USA,27.97547,-82.53325,Delta Air Lines Inc.,2015,12,11,5,2137,N698DL,ATL,0720,0716,-4,30,0746,98,111,72,406,0858,9,0858,0907,9,0,0,,,,,,    
## Tampa International Airport,Tampa,FL,USA,27.97547,-82.53325,Southwest Airlines Co.,2015,3,8,7,572,N632SW,PVD,1410,1413,3,10,1423,165,157,144,1136,1647,3,1655,1650,-5,0,0,,,,,,  
## Ronald Reagan Washington National Airport,Arlington,VA,USA,38.85208,-77.03772,American Airlines Inc.,2015,7,26,7,2119,N952UW,BOS,1330,1324,-6,15,1339,82,83,63,399,1442,5,1452,1447,-5,0,0,,,,,, 
## Nashville International Airport,Nashville,TN,USA,36.12448,-86.67818,American Airlines Inc.,2015,6,2,2,2348,N480AA,DFW,0730,0725,-5,21,0746,125,120,85,631,0911,14,0935,0925,-10,0,0,,,,,,    

Imagen del jps del clúster

Nota: dadas las capacidades técnicas del tercer equipo que quisimos unir al clúster lo tuvimos que eliminar en el proceso de Map-Reduce pues generaba errores de memoria. Sin embargo, como se puede observar en la primera parte de la tarea, si logramos levantar el YARN con tres equipos.

Master

Slave

En este se muestran los procesos que corren durante la ejecución del Map-Reduce en el esclavo.

Imagen del localhost:8088 seleccionando la opción de nodos:

Imagen del localhost:8088 seleccionando la opción FINISHED para ver que los jobs corrieron correctamente

Producto de la línea de comando mostrando el mapreduce exitoso: